/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.test.replication;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.manager.replication.WorkMaker;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.google.common.collect.Iterables;

@Ignore("Replication ITs are not stable and not currently maintained")
@Deprecated
public class WorkMakerIT extends ConfigurableMacBase {

  private AccumuloClient client;

  private static class MockWorkMaker extends WorkMaker {

    public MockWorkMaker(AccumuloClient client) {
      super(null, client);
    }

    @Override
    public void setBatchWriter(BatchWriter bw) {
      super.setBatchWriter(bw);
    }

    @Override
    public void addWorkRecord(Text file, Value v, Map<String,String> targets,
        TableId sourceTableId) {
      super.addWorkRecord(file, v, targets, sourceTableId);
    }

    @Override
    public boolean shouldCreateWork(Status status) {
      return super.shouldCreateWork(status);
    }

  }

  @Before
  public void setupInstance() throws Exception {
    client = Accumulo.newClient().from(getClientProperties()).build();
    ReplicationTable.setOnline(client);
    client.securityOperations().grantTablePermission(client.whoami(), ReplicationTable.NAME,
        TablePermission.WRITE);
    client.securityOperations().grantTablePermission(client.whoami(), ReplicationTable.NAME,
        TablePermission.READ);
  }

  @Test
  public void singleUnitSingleTarget() throws Exception {
    String table = testName.getMethodName();
    client.tableOperations().create(table);
    TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(table));
    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";

    // Create a status record for a file
    long timeCreated = System.currentTimeMillis();
    Mutation m = new Mutation(new Path(file).toString());
    m.put(StatusSection.NAME, new Text(tableId.canonical()),
        StatusUtil.fileCreatedValue(timeCreated));
    BatchWriter bw = ReplicationTable.getBatchWriter(client);
    bw.addMutation(m);
    bw.flush();

    // Assert that we have one record in the status section
    ReplicationTarget expected;
    try (Scanner s = ReplicationTable.getScanner(client)) {
      StatusSection.limit(s);
      assertEquals(1, Iterables.size(s));

      MockWorkMaker workMaker = new MockWorkMaker(client);

      // Invoke the addWorkRecord method to create a Work record from the Status record earlier
      expected = new ReplicationTarget("remote_cluster_1", "4", tableId);
      workMaker.setBatchWriter(bw);
      workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(timeCreated),
          Map.of("remote_cluster_1", "4"), tableId);
    }

    // Scan over just the WorkSection
    try (Scanner s = ReplicationTable.getScanner(client)) {
      WorkSection.limit(s);

      Entry<Key,Value> workEntry = Iterables.getOnlyElement(s);
      Key workKey = workEntry.getKey();
      ReplicationTarget actual = ReplicationTarget.from(workKey.getColumnQualifier());

      assertEquals(file, workKey.getRow().toString());
      assertEquals(WorkSection.NAME, workKey.getColumnFamily());
      assertEquals(expected, actual);
      assertEquals(workEntry.getValue(), StatusUtil.fileCreatedValue(timeCreated));
    }
  }

  @Test
  public void singleUnitMultipleTargets() throws Exception {
    String table = testName.getMethodName();
    client.tableOperations().create(table);

    TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(table));

    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";

    Mutation m = new Mutation(new Path(file).toString());
    m.put(StatusSection.NAME, new Text(tableId.canonical()),
        StatusUtil.fileCreatedValue(System.currentTimeMillis()));
    BatchWriter bw = ReplicationTable.getBatchWriter(client);
    bw.addMutation(m);
    bw.flush();

    // Assert that we have one record in the status section
    Set<ReplicationTarget> expectedTargets = new HashSet<>();
    try (Scanner s = ReplicationTable.getScanner(client)) {
      StatusSection.limit(s);
      assertEquals(1, Iterables.size(s));

      MockWorkMaker workMaker = new MockWorkMaker(client);

      Map<String,String> targetClusters =
          Map.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8");

      for (Entry<String,String> cluster : targetClusters.entrySet()) {
        expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId));
      }
      workMaker.setBatchWriter(bw);
      workMaker.addWorkRecord(new Text(file),
          StatusUtil.fileCreatedValue(System.currentTimeMillis()), targetClusters, tableId);
    }

    try (Scanner s = ReplicationTable.getScanner(client)) {
      WorkSection.limit(s);

      Set<ReplicationTarget> actualTargets = new HashSet<>();
      for (Entry<Key,Value> entry : s) {
        assertEquals(file, entry.getKey().getRow().toString());
        assertEquals(WorkSection.NAME, entry.getKey().getColumnFamily());

        ReplicationTarget target = ReplicationTarget.from(entry.getKey().getColumnQualifier());
        actualTargets.add(target);
      }

      for (ReplicationTarget expected : expectedTargets) {
        assertTrue("Did not find expected target: " + expected, actualTargets.contains(expected));
        actualTargets.remove(expected);
      }

      assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty());
    }
  }

  @Test
  public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception {
    String table = testName.getMethodName();
    client.tableOperations().create(table);
    String tableId = client.tableOperations().tableIdMap().get(table);
    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";

    Mutation m = new Mutation(new Path(file).toString());
    m.put(StatusSection.NAME, new Text(tableId),
        StatusUtil.fileCreatedValue(System.currentTimeMillis()));
    BatchWriter bw = ReplicationTable.getBatchWriter(client);
    bw.addMutation(m);
    bw.flush();

    // Assert that we have one record in the status section
    try (Scanner s = ReplicationTable.getScanner(client)) {
      StatusSection.limit(s);
      assertEquals(1, Iterables.size(s));

      MockWorkMaker workMaker = new MockWorkMaker(client);

      client.tableOperations().setProperty(ReplicationTable.NAME,
          Property.TABLE_REPLICATION_TARGET.getKey() + "remote_cluster_1", "4");

      workMaker.setBatchWriter(bw);

      // If we don't shortcircuit out, we should get an exception because
      // ServerConfiguration.getTableConfiguration
      // won't work with MockAccumulo
      workMaker.run();
    }

    try (Scanner s = ReplicationTable.getScanner(client)) {
      WorkSection.limit(s);

      assertEquals(0, Iterables.size(s));
    }
  }

}
